###
### Construct treatment and control groups for ad-level DiD
###


library(dplyr)
library(purrr)
library(tidyr)
library(forcats)
library(stringr)
library(readr)
library(tibble)
library(data.table)
library(parallel)
library(lubridate)
library(hms)
library(magrittr)

# setDTthreads(0)      # enable multithreading in data.table if not using mclapply
setDTthreads(1)    # disable multithreading in data.table if using mclapply

### SET WORKING DIRECTORY HERE 
path_to_archive <- "replication/"
setwd(path_to_archive)


# load ads data
ads <- readRDS("data/final_ads.rds")

dim(ads)
# [1] 286863     65

table(ads$ad_type)
# cand_house      cand_pres    cand_senate cand_statewide        outside 
#      52051          76172          65776          22935          69929 

ads_nested <- ads %>% 
  group_by(Date) %>%
  nest %>% 
  arrange(Date)

# for timezone adjustments
load("data/dma_timezone.RData")
dma_timezone <- dma_timezone %>% 
	select(dma_code, timezone) %>%
	mutate(timezone = recode(timezone, ETZ = "America/New_York", CTZ = "America/Chicago", PTZ="America/Los_Angeles")) %>%
	filter(!is.na(dma_code)) %>%
	as.data.table

calls_match <- function(ad_channel, channel) (channel == ad_channel | channel == paste(ad_channel,"DT",sep=""))
rawfile_name <- function(date) paste0("data/rawxml/FWM_", year(date), str_pad(month(date), pad="0", width=2), str_pad(day(date), pad="0", width=2), "_R.pd.gz")
reffile_name <- function(date) paste0("data/ref_data/fwm_ref_data_", year(date), str_pad(month(date), pad="0", width=2), str_pad(day(date), pad="0", width=2), ".RData")

read_raw <- function(d) {
	cat("\tReading raw data...\n")	
	raw_view <- d %>% rawfile_name %>%
		fread( 
         sep="|", 
         col.names = c("mso", "device_id", "event_date", "event_time", "event_type", "event_value", "event_name", "event_id"),
         colClasses="character"
        ) %>%
		setnames(old=c("event_name", "event_value"), new=c("channel", "channel_num"))

	cat("\tConverting times and dates...\n")
	raw_view[,event_date := ymd(event_date)]
    raw_view[,event_time := hms(hours=as.numeric(substr(event_time,1,2)), minutes=as.numeric(substr(event_time, 3,4)), seconds=as.numeric(substr(event_time, 5,6)))]

	raw_view[channel_num == "65532", event_type := "O"]
	raw_view[event_type != "T", channel := "OFF"]

	ref_day <- reffile_name(d) %>% readRDS %>% 
		as.data.table %>%
		.[,c("device_id","dma_code","zipcode")] %>%
		.[dma_timezone, on="dma_code"]

	raw_view <- ref_day[raw_view,on="device_id"]
	
	# replace missing dmas with LEXINGTON KY (541)
	raw_view[is.na(dma_code), dma_code:="541"]
	raw_view[is.na(timezone), timezone:="America/New_York"]

	raw_view[timezone == "America/New_York", event_time_utc := ymd_hms(paste(event_date, event_time, sep=" "), tz="America/New_York") %>% with_tz("UTC")]
	raw_view[timezone == "America/Chicago", event_time_utc := ymd_hms(paste(event_date, event_time, sep=" "), tz="America/Chicago") %>% with_tz("UTC")]
	raw_view[timezone == "America/Los_Angeles", event_time_utc := ymd_hms(paste(event_date, event_time, sep=" "), tz="America/Los_Angeles") %>% with_tz("UTC")]
	raw_view[is.na(event_time_utc), event_time_utc := ymd_hms(paste(event_date, event_time, sep=" "), tz="America/New_York") %>% with_tz("UTC")]


	cat("\tAppending end-of-day events...\n")
	end_of_day <- data.table(
		event_date = d,
		event_time = hms(hours=rep(23,3),minutes=rep(59,3),seconds=rep(59,3)),
		timezone = c("America/New_York", "America/Chicago", "America/Los_Angeles"))
	end_of_day[, event_time_utc := imap(timezone, ~ ymd_hms(paste(event_date[.y], event_time[.y], sep=" "), tz = .x) %>% with_tz("UTC")) %>% reduce(c) ]


	every_dev_end <- unique(raw_view, by = "device_id")[,.(device_id,dma_code,zipcode,timezone,mso,event_id)]
	every_dev_end[,event_type:="E"]
	every_dev_end[,event_id:=paste(device_id, "END", sep="_")]
	every_dev_end[,channel_num:=0]
	every_dev_end[,channel:="END"]

	every_dev_end <- end_of_day[every_dev_end, on="timezone"]

	raw_view <- rbind(raw_view, every_dev_end)
	raw_view <- raw_view[order(device_id, event_time_utc),]

	cat("\tGenerating viewing intervals...\n")
	raw_view[, event_time_utc_end := lead(event_time_utc), by = .(device_id)]
	raw_view <- raw_view[!is.na(event_time_utc_end)]

	## cap segment time at 5h (18000s), ~ 99th percentile
	raw_view[event_type=="T", dur := time_length(event_time_utc_end - event_time_utc, unit="second")]
	raw_view[dur > 18000, `:=` (dur = 18000, event_time_utc_end = event_time_utc + dseconds(18000)) ]

	raw_view[event_type=="T" & dur >= 1]
}

get_t_c_groups <- function(ad_time_utc, ad_channel, tune_data, window_restrict=NULL) {
	# cat("\t", ad_channel, as.character(ad_time_utc), "\n")

	setkeyv(tune_data, c("device_id","event_time_utc"))

	### treatment group: tuned in to channel at ad time, 
	### tuning event happened in last window_restrict mins
	active_at_ad_time <- tune_data[event_time_utc <= ad_time_utc &
					 			   event_time_utc_end > ad_time_utc]

    ## compute time differences between tune in / out event and ad time
    # tunein_delta negative for treatment, positive for C2
    # tuneout_delta positive for treatment, negative for C1
	active_at_ad_time[,`:=` (tunein_delta = time_length(event_time_utc - ad_time_utc, unit="seconds"),
							 tuneout_delta = time_length(event_time_utc_end - ad_time_utc, unit="seconds"))]
	
	## set of all devices that were tuned to the ad channel at ad time
	saw_ad <- active_at_ad_time[calls_match(ad_channel, channel), .(device_id)]

	## T group is the subset of devices tuned to the ad channel at ad time
	## that tuned in <= window_restrict mins prior
	t_group <- active_at_ad_time[calls_match(ad_channel, channel) & 
								 time_length(ad_time_utc - event_time_utc, unit="minutes") <= window_restrict, 
					 .(device_id, tunein_delta, tuneout_delta)]
	t_group[, group := "T"]

	### control groups: viewed same program, not in T group
	## tuned in within window_restrict minutes of ad time
	## and device active at time of ad airing
	## use half hour block surrounding the ad time to define program

	if (!is.null(window_restrict)) {
		prog_start_time <- max(floor_date(ad_time_utc, "30 minutes"), ad_time_utc - minutes(window_restrict))
		prog_end_time <- min(ceiling_date(ad_time_utc, "30 minutes"), ad_time_utc + minutes(window_restrict))
	} else {
		prog_start_time <- floor_date(ad_time_utc, "30 minutes")
		prog_end_time <- ceiling_date(ad_time_utc, "30 minutes")
	}
	
	# C group is devices that tuned to ad channel no more than window_restrict prior to ad time
	# and tuned out no more than window_restrict before (C1) or after (C2)
	c_group <- tune_data[calls_match(ad_channel, channel) & 
			    		 event_time_utc     <= prog_end_time &
			    		 event_time_utc_end >  prog_start_time &
			    		 time_length(ad_time_utc - event_time_utc, unit="minutes") <= window_restrict,] %>%
						 .[!eval(saw_ad$device_id)] %>%
						 .[eval(unique(active_at_ad_time$device_id)),nomatch=0] %>%
						 .[,`:=` (group = ifelse(event_time_utc > ad_time_utc, "C2", "C1"),
						 		  tuneout_delta = time_length(event_time_utc_end - ad_time_utc, unit="seconds"),
						 		  tunein_delta = time_length(event_time_utc - ad_time_utc, unit="seconds"))] %>%
						 .[order(device_id, event_time * if_else(group=="C1", -1, 1))] %>% # deal with multiple events by same device by picking closest to ad time
						 unique(by = c("device_id", "group")) %>%  
						 .[,.(group = paste(sort(unique(group)), collapse="+"),
						 	  tunein_delta = max(tunein_delta),
						 	  tuneout_delta = min(tuneout_delta)),by=.(device_id)]

	devs <- rbind(t_group, c_group)

	# compute time watching show on which the ad ran
	this_show_view <- tune_data[calls_match(ad_channel, channel) & 
			    		 		event_time_utc     <= prog_end_time &
			    		 		event_time_utc_end >  prog_start_time] %>%
			    		 		.[devs, on = "device_id"] %>%
			    		 		.[,`:=` (event_time_utc = pmax(prog_start_time, event_time_utc),
			    		 			    event_time_utc_end = pmin(prog_end_time, event_time_utc_end))] %>%
			    		 		.[,.(adshow_time = sum(time_length(event_time_utc_end-event_time_utc, unit="minutes"))),by=.(device_id)]

	this_show_view[devs, on = .(device_id)]
}


day_loop <- function(date, ads) {
	cat(as.character(date), "\n")
	tune_day <- read_raw(date)

	## identify T / C devices
	cat("\tIdentifying T and C groups by ad...\n")
	
	t_c_by_ad <- ads %>% 
		select(dma_code, channel, affiliate, program, event_time_utc) %>%
		mutate(t_c = map2(event_time_utc, channel, get_t_c_groups, tune_data=tune_day, window_restrict=10))


	saveRDS(t_c_by_ad, file=paste0("data/dd/tc_groups_", as.character(date), ".rds"))

}

ads_split <- split(ads_nested, f = 1:24)
dd_estimates <- mclapply(ads_split, FUN = function(d) walk2(d$Date, d$data, day_loop), mc.cores=24)